{
 "metadata": {
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6-final"
  },
  "orig_nbformat": 2,
  "kernelspec": {
   "name": "python3",
   "display_name": "Python 3",
   "language": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2,
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark import SparkContext\n",
    "from pyspark import SparkConf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "conf=SparkConf().setAppName(\"miniProject\").setMaster(\"local\").set(\"spark.executor.memory\",\"3g\")\\\n",
    "        .set(\"spark.executor.instances\",\"2\")\n",
    "sc=SparkContext.getOrCreate(conf)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.mllib.classification import LogisticRegressionWithLBFGS\n",
    "from pyspark.mllib.evaluation import BinaryClassificationMetrics\n",
    "from pyspark.mllib.util import MLUtils"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "output_type": "execute_result",
     "data": {
      "text/plain": [
       "PythonRDD[4] at RDD at PythonRDD.scala:53"
      ]
     },
     "metadata": {},
     "execution_count": 4
    }
   ],
   "source": [
    "data = MLUtils.loadLibSVMFile(sc, \"hdfs://node1:9000/user/root/exp4/procd_train_real\")\n",
    "\n",
    "# Split data into training (60%) and test (40%)\n",
    "training, test = data.randomSplit([0.6, 0.4], seed=11)\n",
    "training.cache()\n"
   ]
  },
  {
   "source": [
    "## Logistic Regression"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Logistic Regression\n",
    "# Run training algorithm to build the model\n",
    "model = LogisticRegressionWithLBFGS.train(training)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "Area under PR = 0.12141015007401622\nArea under ROC = 0.5016382862362231\n"
     ]
    }
   ],
   "source": [
    "# Compute raw scores on the test set\n",
    "predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))\n",
    "\n",
    "# Instantiate metrics object\n",
    "metrics = BinaryClassificationMetrics(predictionAndLabels)\n",
    "\n",
    "# Area under precision-recall curve\n",
    "print(\"Area under PR = %s\" % metrics.areaUnderPR)\n",
    "\n",
    "# Area under ROC curve\n",
    "print(\"Area under ROC = %s\" % metrics.areaUnderROC)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "Test Error = 0.0625555970042182\n"
     ]
    }
   ],
   "source": [
    "testErr = predictionAndLabels.filter(lambda lp: lp[0] != lp[1]).count() / float(test.count())\n",
    "print(\"Test Error = \" + str(testErr))"
   ]
  },
  {
   "source": [
    "## SVM"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "# SVM\n",
    "# Build the model\n",
    "from pyspark.mllib.classification import SVMWithSGD, SVMModel\n",
    "svm_model = SVMWithSGD.train(training, iterations=100)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "Training Error = 0.06195103539602219\n"
     ]
    }
   ],
   "source": [
    "# Evaluating the model on training data\n",
    "labelsAndPreds = training.map(lambda p: (p.label, model.predict(p.features)))\n",
    "trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(training.count())\n",
    "print(\"Training Error = \" + str(trainErr))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "Test Error = 0.06154169894879815\n"
     ]
    }
   ],
   "source": [
    "# Evaluating the model on test data\n",
    "labelsAndPreds = test.map(lambda p: (p.label, svm_model.predict(p.features)))\n",
    "testErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(test.count())\n",
    "print(\"Test Error = \" + str(testErr))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# model.save(sc, \"target/tmp/pythonSVMWithSGDModel\")\n",
    "# sameModel = SVMModel.load(sc, \"target/tmp/pythonSVMWithSGDModel\")"
   ]
  },
  {
   "source": [
    "## Decision Tree"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "# DecisionTree\n",
    "from pyspark.mllib.tree import DecisionTree, DecisionTreeModel\n",
    "# Train a DecisionTree model.\n",
    "#  Empty categoricalFeaturesInfo indicates all features are continuous.\n",
    "model = DecisionTree.trainClassifier(training, numClasses=2, categoricalFeaturesInfo={},\n",
    "                                         impurity='gini', maxDepth=5, maxBins=32)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "Test Error = 0.061589524328770795\nLearned classification tree model:\nDecisionTreeModel classifier of depth 5 with 21 nodes\n  If (feature 3 <= 6.5)\n   If (feature 8 <= 1.5)\n    If (feature 3 <= 2.5)\n     If (feature 4 <= 3.5)\n      Predict: 0.0\n     Else (feature 4 > 3.5)\n      Predict: 1.0\n    Else (feature 3 > 2.5)\n     Predict: 0.0\n   Else (feature 8 > 1.5)\n    Predict: 0.0\n  Else (feature 3 > 6.5)\n   If (feature 3 <= 19.5)\n    Predict: 0.0\n   Else (feature 3 > 19.5)\n    If (feature 4 <= 5.5)\n     If (feature 8 <= 8.5)\n      Predict: 0.0\n     Else (feature 8 > 8.5)\n      If (feature 5 <= 1.5)\n       Predict: 0.0\n      Else (feature 5 > 1.5)\n       Predict: 1.0\n    Else (feature 4 > 5.5)\n     If (feature 8 <= 2.5)\n      Predict: 0.0\n     Else (feature 8 > 2.5)\n      If (feature 5 <= 17.5)\n       Predict: 0.0\n      Else (feature 5 > 17.5)\n       Predict: 1.0\n\n"
     ]
    }
   ],
   "source": [
    "# Evaluate model on test instances and compute test error\n",
    "predictions = model.predict(test.map(lambda x: x.features))\n",
    "labelsAndPredictions = test.map(lambda lp: lp.label).zip(predictions)\n",
    "testErr = labelsAndPredictions.filter(\n",
    "    lambda lp: lp[0] != lp[1]).count() / float(test.count())\n",
    "print('Test Error = ' + str(testErr))\n",
    "print('Learned classification tree model:')\n",
    "print(model.toDebugString())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Save and load model\n",
    "# model.save(sc, \"target/tmp/myDecisionTreeClassificationModel\")\n",
    "# sameModel = DecisionTreeModel.load(sc, \"target/tmp/myDecisionTreeClassificationModel\")"
   ]
  },
  {
   "source": [
    "## Naive Bayes"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "import shutil\n",
    "from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel\n",
    "# Train a naive Bayes model.\n",
    "model = NaiveBayes.train(training, 1.0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "model accuracy 0.8911876954862407\n"
     ]
    }
   ],
   "source": [
    "# Make prediction and test accuracy.\n",
    "predictionAndLabel = test.map(lambda p: (model.predict(p.features), p.label))\n",
    "accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / test.count()\n",
    "print('model accuracy {}'.format(accuracy))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Save and load model\n",
    "# output_dir = 'target/tmp/myNaiveBayesModel'\n",
    "# shutil.rmtree(output_dir, ignore_errors=True)\n",
    "# model.save(sc, output_dir)\n",
    "# sameModel = NaiveBayesModel.load(sc, output_dir)\n",
    "# predictionAndLabel = test.map(lambda p: (sameModel.predict(p.features), p.label))\n",
    "# accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / test.count()\n",
    "# print('sameModel accuracy {}'.format(accuracy))"
   ]
  },
  {
   "source": [
    "## Random Forest"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.mllib.tree import RandomForest, RandomForestModel\n",
    "# Train a RandomForest model.\n",
    "#  Empty categoricalFeaturesInfo indicates all features are continuous.\n",
    "#  Note: Use larger numTrees in practice.\n",
    "#  Setting featureSubsetStrategy=\"auto\" lets the algorithm choose.\n",
    "model = RandomForest.trainClassifier(training, numClasses=2, categoricalFeaturesInfo={},\n",
    "                                        numTrees=3, featureSubsetStrategy=\"auto\",\n",
    "                                        impurity='gini', maxDepth=4, maxBins=32)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "Test Error = 0.061551264024792676\nLearned classification forest model:\nTreeEnsembleModel classifier with 3 trees\n\n  Tree 0:\n    If (feature 3 <= 8.5)\n     If (feature 3 <= 2.5)\n      If (feature 3 <= 1.5)\n       Predict: 0.0\n      Else (feature 3 > 1.5)\n       If (feature 6 <= 10.5)\n        Predict: 0.0\n       Else (feature 6 > 10.5)\n        Predict: 1.0\n     Else (feature 3 > 2.5)\n      Predict: 0.0\n    Else (feature 3 > 8.5)\n     If (feature 6 <= 8.5)\n      Predict: 0.0\n     Else (feature 6 > 8.5)\n      If (feature 8 <= 3.5)\n       Predict: 0.0\n      Else (feature 8 > 3.5)\n       If (feature 5 <= 11.5)\n        Predict: 0.0\n       Else (feature 5 > 11.5)\n        Predict: 1.0\n  Tree 1:\n    If (feature 8 <= 1.5)\n     Predict: 0.0\n    Else (feature 8 > 1.5)\n     If (feature 3 <= 18.5)\n      Predict: 0.0\n     Else (feature 3 > 18.5)\n      If (feature 8 <= 8.5)\n       Predict: 0.0\n      Else (feature 8 > 8.5)\n       If (feature 6 <= 11.5)\n        Predict: 0.0\n       Else (feature 6 > 11.5)\n        Predict: 1.0\n  Tree 2:\n    If (feature 4 <= 5.5)\n     Predict: 0.0\n    Else (feature 4 > 5.5)\n     If (feature 6 <= 9.5)\n      Predict: 0.0\n     Else (feature 6 > 9.5)\n      If (feature 8 <= 4.5)\n       Predict: 0.0\n      Else (feature 8 > 4.5)\n       If (feature 5 <= 17.5)\n        Predict: 0.0\n       Else (feature 5 > 17.5)\n        Predict: 1.0\n\n"
     ]
    }
   ],
   "source": [
    "# Evaluate model on test instances and compute test error\n",
    "predictions = model.predict(test.map(lambda x: x.features))\n",
    "labelsAndPredictions = test.map(lambda lp: lp.label).zip(predictions)\n",
    "testErr = labelsAndPredictions.filter(\n",
    "    lambda lp: lp[0] != lp[1]).count() / float(test.count())\n",
    "print('Test Error = ' + str(testErr))\n",
    "print('Learned classification forest model:')\n",
    "print(model.toDebugString())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Save and load model\n",
    "# model.save(sc, \"target/tmp/myRandomForestClassificationModel\")\n",
    "# sameModel = RandomForestModel.load(sc, \"target/tmp/myRandomForestClassificationModel\")"
   ]
  },
  {
   "source": [
    "## Gradient Boosted Trees"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel\n",
    "# Train a GradientBoostedTrees model.\n",
    "#  Notes: (a) Empty categoricalFeaturesInfo indicates all features are continuous.\n",
    "#         (b) Use more iterations in practice.\n",
    "model = GradientBoostedTrees.trainClassifier(training,\n",
    "                                                categoricalFeaturesInfo={}, numIterations=3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "Test Error = 0.06154169894879815\nLearned classification GBT model:\nTreeEnsembleModel classifier with 3 trees\n\n  Tree 0:\n    If (feature 3 <= 6.5)\n     If (feature 8 <= 1.5)\n      If (feature 3 <= 2.5)\n       Predict: -0.9135673567356736\n      Else (feature 3 > 2.5)\n       Predict: -0.8897163920860603\n     Else (feature 8 > 1.5)\n      If (feature 8 <= 2.5)\n       Predict: -0.8504097017371354\n      Else (feature 8 > 2.5)\n       Predict: -0.8043789645999591\n    Else (feature 3 > 6.5)\n     If (feature 3 <= 19.5)\n      If (feature 8 <= 1.5)\n       Predict: -0.8410674825769165\n      Else (feature 8 > 1.5)\n       Predict: -0.7775098640946954\n     Else (feature 3 > 19.5)\n      If (feature 4 <= 5.5)\n       Predict: -0.7669565217391304\n      Else (feature 4 > 5.5)\n       Predict: -0.6267470539873938\n  Tree 1:\n    If (feature 4 <= 2.5)\n     If (feature 8 <= 1.5)\n      If (feature 0 <= 3.5)\n       Predict: -0.3896669891412022\n      Else (feature 0 > 3.5)\n       Predict: -0.3561260039709395\n     Else (feature 8 > 1.5)\n      If (feature 6 <= 10.5)\n       Predict: -0.3209969044766414\n      Else (feature 6 > 10.5)\n       Predict: 0.3236583479644418\n    Else (feature 4 > 2.5)\n     If (feature 6 <= 8.5)\n      If (feature 5 <= 3.5)\n       Predict: -0.29585016488201826\n      Else (feature 5 > 3.5)\n       Predict: -0.1930553552709616\n     Else (feature 6 > 8.5)\n      If (feature 8 <= 4.5)\n       Predict: -0.08339703455331733\n      Else (feature 8 > 4.5)\n       Predict: 0.24800276880190228\n  Tree 2:\n    If (feature 4 <= 2.5)\n     If (feature 8 <= 1.5)\n      If (feature 0 <= 3.5)\n       Predict: -0.353087440725917\n      Else (feature 0 > 3.5)\n       Predict: -0.3225447163709254\n     Else (feature 8 > 1.5)\n      If (feature 6 <= 10.5)\n       Predict: -0.2873597307189548\n      Else (feature 6 > 10.5)\n       Predict: 0.28550505210351246\n    Else (feature 4 > 2.5)\n     If (feature 5 <= 5.5)\n      If (feature 8 <= 3.5)\n       Predict: -0.26588539767682656\n      Else (feature 8 > 3.5)\n       Predict: -0.1252120399259314\n     Else (feature 5 > 5.5)\n      If (feature 4 <= 11.5)\n       Predict: -0.12515062378376687\n      Else (feature 4 > 11.5)\n       Predict: 0.20312568681343396\n\n"
     ]
    }
   ],
   "source": [
    "# Evaluate model on test instances and compute test error\n",
    "predictions = model.predict(test.map(lambda x: x.features))\n",
    "labelsAndPredictions = test.map(lambda lp: lp.label).zip(predictions)\n",
    "testErr = labelsAndPredictions.filter(\n",
    "    lambda lp: lp[0] != lp[1]).count() / float(test.count())\n",
    "print('Test Error = ' + str(testErr))\n",
    "print('Learned classification GBT model:')\n",
    "print(model.toDebugString())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Save and load model\n",
    "# model.save(sc, \"target/tmp/myGradientBoostingClassificationModel\")\n",
    "# sameModel = GradientBoostedTreesModel.load(sc,\n",
    "#                                             \"target/tmp/myGradientBoostingClassificationModel\")"
   ]
  },
  {
   "source": [
    "# Don't forget to stop the SparkContext!"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc.stop()"
   ]
  }
 ]
}